package com.guandata.cdc.model.sink.sink_table_name_strategy;

import com.guandata.cdc.model.sink.SinkTableNameStrategy;
import org.apache.flink.api.java.utils.ParameterTool;

public class PrefixSinkTableNameStrategy implements SinkTableNameStrategy {
    private static final String PREFIX_NAME_TEMPLATE = "%s%s";

    private final String prefix;
    private final String tableName;

    public PrefixSinkTableNameStrategy(ParameterTool params) {
        this.prefix = params.get("sink.table.prefix", "");
        this.tableName = params.getRequired("source.table");
    }

    @Override
    public String getSinkTableName() {
        return String.format(PREFIX_NAME_TEMPLATE, prefix, tableName);
    }
}
